#include <stdio.h>
#include <string.h>
#include <stdlib.h> 
#include <time.h>
#include <getopt.h>
#include "mssql_common.h"
#include "stpool.h"
#include "msglog.h"

#define STR_LEN 400		//随机输出的字符串长度。
#define CHAR_MIN 'a'
#define CHAR_MAX 'z' 	//输出随机字符串每个字符的最大最小值。
void random_string(char* random_buf, unsigned long seed)
{
	random_buf[STR_LEN + 1] = '\0';
    //srand(time(NULL));
    srand(seed);
    for(int i = 0; i < STR_LEN; i ++){
        random_buf[i] = rand()%(CHAR_MAX-CHAR_MIN + 1) + CHAR_MIN;
    }
}

#define QTYPE_WRITE		1
#define QTYPE_READ 		2

typedef struct __main_thr_passin_para{
	struct __database_link* dblink;	//database link information
	char  tabname[64];
	struct __result_set* tabcols;
	int querytype;
}MAINTHR_PASSIN_PARA;

typedef struct __table_columns{
	unsigned long idx;
	char name[32];
	char code[STR_LEN+10];
}TAB_COLS;

typedef struct __stpool_parameter
{
	unsigned long wkidx;	//worker index
	struct __main_thr_passin_para* mainthrpara;
	struct __table_columns columns;
	MSSQL_CONN conn;
}TP_PARA;


void TP_PARA_CONSTRUCT(struct __stpool_parameter* tppara)
{
	tppara->columns.idx = tppara->wkidx+1;
	sprintf(tppara->columns.name, "HAH.IT.INFRA.%d", tppara->wkidx+1);
	random_string(tppara->columns.code, tppara->wkidx);
}

/**
  * @brief  事务处理程序.
  * @Notes  stpool线程池模型下的事务处理程序固定格式.
  * @param  ptsk, stpool线程传入参数
  * @retval None.
  */
void MYAPP_TASK(struct sttask *ptsk)
{
	struct __stpool_parameter* tppara = (struct __stpool_parameter*)(ptsk->task_arg);
	char mssqlbuf[STR_LEN+100]; memset(mssqlbuf, 0x00, sizeof(mssqlbuf));

	if(&(tppara->conn) != NULL){
     	MSSQL_CONN_OPEN(&(tppara->conn), tppara->mainthrpara->dblink);
	    switch(tppara->mainthrpara->querytype){
	    	case QTYPE_WRITE:
	    		sprintf(mssqlbuf, "INSERT INTO %s (idx, name, code) VALUES (%ld, '%s', '%s');",  \
							tppara->mainthrpara->tabname,  \
							tppara->columns.idx, tppara->columns.name, tppara->columns.code); 
	    		break;
	    	case QTYPE_READ:
	    		sprintf(mssqlbuf, "SELECT name from %s WHERE idx=%ld;",  \
							tppara->mainthrpara->tabname,  \
							tppara->columns.idx); 
	    		break;
	    	default:
	    		break;
    	}
	    //printf("%s\n", mssqlbuf);   
	    dbcmd(tppara->conn.dbprocess, mssqlbuf); //sql text 保存到数据库连接句柄的缓存中 
	    if(dbsqlexec(tppara->conn.dbprocess) == FAIL){ 
	    	fprintf(stderr, "TDS/> sqlexec error!\n");  
	        dbclose(tppara->conn.dbprocess);                               
	        exit(EXIT_FAILURE);                            
	    }else{
	    	printf("worker[%ld] running...\r", tppara->wkidx);
	    }
        dbclose(tppara->conn.dbprocess);
    	dbexit();
	}

}

unsigned long activate_workloads(stpool_t *thrpool, unsigned long total_workloads,
								struct __main_thr_passin_para* mainthrpara)
{
	unsigned long processed_workloads = 0;
	// libstpool 单个线程池最大只能添加 10000个工作负载, 
	// 大数据环境下, 工作负载需要分断处理
	// CentOS7.5 用户最大线程数为 15066
	int max_workloads_per_running_cycle = 10000;

	/* 为工作负载分配 传入参数 存储空间 */
	struct __stpool_parameter* tppara = (struct __stpool_parameter*)malloc
								(total_workloads * sizeof(struct __stpool_parameter));

	for(unsigned long i = 0; i < total_workloads; i++)
	{
		tppara[i].wkidx = i;
		tppara[i].mainthrpara = mainthrpara;
		// 传入参数填充
		TP_PARA_CONSTRUCT(&tppara[i]);
		// 事务处理线程所用到的变量，通过函数的第5个参数传入
		stpool_add_routine(thrpool, "workloads processing", MYAPP_TASK, NULL, &tppara[i], NULL); 
		processed_workloads++;
	}
	printf("total [%d] workloads added to ThreadPool.\n", processed_workloads);
		/* Wait for all workloads' tasks being done completely */
		/* >>> */ stpool_wait_all(thrpool, -1); /* >>> */
 		/* ................................................... */

	printf("APP_TASK/> {INFO} total processed [%ld] data-items.\n", processed_workloads);
	//sleep(1);
	/* 释放工作负载 传入参数 存储空间 */
	free(tppara);

	return processed_workloads;
}

int confirm_dblink(struct __database_link dblink, char* table)
{
	char input[8];
	printf("SQL Server Host IP:   [%s]\n", dblink.SQL_SERVER_IP);
	printf("SQL Server Host PORT: [%s]\n", dblink.SQL_SERVER_PORT);
	printf("Database name:        [%s]\n", dblink.db);
	printf("Username:             [%s]\n", dblink.user);
	printf("Table name:           [%s]\n", table);

	printf("Are you confirm these database link parameters?[yes],[no]: ");
	scanf("%s", input);
	if(strcmp(input, "yes") == 0||strcmp(input, "Y") == 0||strcmp(input, "y") == 0){
		return 1;
	}else
		return 0;
}

struct option options[] = {
	{ "help",	no_argument,		NULL, 'h' },
	{ "query",  required_argument,  NULL, 'Q' },
	{ "host",   required_argument,  NULL, 'H' },
	{ "port",   required_argument,  NULL, 'P' },
	{ "user",   required_argument,  NULL, 'u' },
	{ "password",   required_argument,  NULL, 'p' },
	{ "database",   required_argument,  NULL, 'D' },

	{ "table",	required_argument,	NULL, 't' },
	{ "rows",	required_argument,	NULL, 'r' },
	{ "thread-max",	required_argument,		NULL, 'x' },
	{ "thread-min",	required_argument,		NULL, 'y' },
		{ NULL, 0, 0, 0 }
};

const char* help_info =
    "    -h , --help        help information\n"
    "    -Q , --query       SQL Query type.[INSERT, SELECT]\n"
    "    -H , --host        SQL Server host IP\n"
    "    -P , --port        SQL Server host PORT\n"
    "    -D , --database    database name\n"
    "    -u , --user        username\n"
    "    -p , --password    password\n"
    "    -t , --table       table name\n"
    "    -r , --rows        number of rows need insert\n"
    "    -x , --thread-max  maximum active threads in POOL\n"
    "    -y , --thread-min  minimum active threads in POOL\n"
    "    --------------------------------------------------------------------------------------\n"
    "    Usage: ./mssqltool --query INSERT -H 172.24.1.122 -P 1433 -D testDB -u sa -p 4aKadedr -t Table_1 -r 10000\n"
    "           ./mssqltool --query INSERT -t Table_1 -r 10000\n"
    "    --------------------------------------------------------------------------------------\n";

int main(int argc, char* argv[])
{
	struct __database_link dblink;

    DATABASE_LINK_CONSTRUCT(&dblink, "172.24.1.122", 1433,
                        				  "sa", "4aKadedr",
                        				  "testDB");  
    
    struct __main_thr_passin_para mainthrpara;
    mainthrpara.dblink = &dblink;

	/* 线程池初始化 */
	stpool_t *thrpool;
	long eCAPs;

    MSG_log_set_level(LOG_ERR);
    eCAPs = eCAP_F_DYNAMIC|eCAP_F_SUSPEND|eCAP_F_THROTTLE|eCAP_F_ROUTINE|
			eCAP_F_DISABLEQ|eCAP_F_PRIORITY|eCAP_F_WAIT_ALL;
	int threads_max=500, threads_min=300, numOfWkCnt=5000;

	int n = 0;
	while(1){
		n = getopt_long(argc, argv, "hQ:H:P:u:p:D:t:r:x:y:", options, NULL);
		if (n < 0)
			break;
		switch(n){
			case 'Q':
				if(strncmp(optarg, "INSERT", 6) == 0)
					mainthrpara.querytype = QTYPE_WRITE;
				else if(strncmp(optarg, "SELECT", 6) == 0)
					mainthrpara.querytype = QTYPE_READ;
				break;
			case 'H':
				strcpy(dblink.SQL_SERVER_IP, optarg);
				break;
			case 'P':
				strcpy(dblink.SQL_SERVER_PORT, optarg);
				break;
			case 'D':
				strcpy(dblink.db, optarg);
				break;
			case 'u':
				strcpy(dblink.user, optarg);
				break;
			case 'p':
				strcpy(dblink.pass, optarg);
				break;
			case 't':
				strcpy(mainthrpara.tabname, optarg);
				break;
			case 'r':
				numOfWkCnt = atoi(optarg);
				break;
			case 'x':
				threads_max = atoi(optarg);
			    break;
			case 'y':
				threads_min = atoi(optarg);
				break;
			case 'h':
				printf("%s", help_info);
				return 1;
			default:
				break;
		}
	}

	MSSQL_CONN conn;
	if(confirm_dblink(dblink, mainthrpara.tabname)){
		sprintf(dblink.SQL_SERVER, "%s:%s", dblink.SQL_SERVER_IP, dblink.SQL_SERVER_PORT);
	 	MSSQL_CONN_OPEN(&conn, &dblink);
		mssql_query_column_name(&conn, mainthrpara.tabname);
		mainthrpara.tabcols = &(conn.result);  //columns informations.
	}else
		return 2;

	thrpool = stpool_create("MSSQL-INSERT", eCAPs, threads_max, threads_min, 0, 1);
	if(!thrpool){ 
		fprintf(stderr, "ThreadPool/> {ERROR} Application-Workers thread pool initialize failed!\n");
	}else{ 
		fprintf(stderr, "%s\n", stpool_stat_print(thrpool));
	}

	/* >>>>>>>>>>> stpool线程池添加工作负载 >>>>>>>>>>> */
	activate_workloads(thrpool, numOfWkCnt, &mainthrpara);
	/* >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> */
	
	//释放MSSQL_CONN 查询结果集内存
	MSSQL_CONN_RSTSET_CLEAN(&(conn.result));
	
	/* 释放线程池资源 */
	if(thrpool != NULL){
		stpool_release(thrpool);
		fprintf(stderr, "ThreadPool/> {INFO} Current Thread-pool release.\n");
	}else{
		fprintf(stderr, "ThreadPool/> {INFO} Current Thread-pool is null.\n");
	}
    return 0;
}